Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add timeout and asyncio for internal kv. #25126

Merged
merged 8 commits into from
May 26, 2022

Conversation

fishbone
Copy link
Contributor

Why are these changes needed?

This PR adds timeout and asyncio for internal KV. This only applies to gcs_utils and not ray clients for now since this is purely for ray internal usage.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@fishbone fishbone marked this pull request as draft May 24, 2022 05:03
@fishbone fishbone marked this pull request as ready for review May 24, 2022 05:25
Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!

Also update internal_kv.py to support timeout and asyncio?
A side question, do we need internal_kv.py or should people use gcs_utils.py directly?

@fishbone
Copy link
Contributor Author

fishbone commented May 24, 2022

Hi @jjyao thank you for the review especially when you are off.

Some context here: internal_kv.py is more complicated than this one and we won't include it in this PR for now. internal_kv.py has to work with a front side change in ray client (client -> client worker -> client server -> internal_kv) :(

And also, the timeout is confusing there since client worker -> client server is a gRPC and client server -> internal kv is another gRPC call.

This API is only supposed to be used internally for now. The end-user should only use internal_kv.py.

I strongly feel there is a GAP here and due to the ray client it's becoming very complicated to support even a simple feature.

So I'd plan to keep it as now and we can add the feature if asked by the user and get a better picture of the requirement.

@@ -208,10 +209,12 @@ def address(self):
return self._channel._gcs_address

@_auto_reconnect
def internal_kv_get(self, key: bytes, namespace: Optional[bytes]) -> bytes:
def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[int] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the type annotation be Optional[float]? Also I'm not a fan of gRPC's naming of this parameter. Should it be timeout_s to indicate the unit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel in python whenever there is a timeout, it's always seconds. is it a conventional way to do this in python? I'll change the type to float.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can go with the convention.

assert gcs_client.internal_kv_put(b"A", b"C", False, b"NS") == 0
assert gcs_client.internal_kv_get(b"A", b"NS") == b"B"
assert gcs_client.internal_kv_put(b"A", b"C", True, b"NS") == 0
assert gcs_client.internal_kv_get(b"A", b"NS") == b"C"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for getting non-existent key?

@jjyao
Copy link
Collaborator

jjyao commented May 24, 2022

This API is only supposed to be used internally for now. The end-user should only use internal_kv.py.

Who is end-user here? Should usage_lib.py use this API or internal_kv.py?

@fishbone
Copy link
Contributor Author

This API is only supposed to be used internally for now. The end-user should only use internal_kv.py.

Who is end-user here? Should usage_lib.py use this API or internal_kv.py?

The end-user should be the component related to the ray core. It's an internal API. If usage_lib.py can run in client mode, you should use internal_kv.py. In the long-term, ray client shouldn't use internal kv and should always call API server (dashboard) to forward traffic to GCS (or gcs request -> client server -> gcs).

@mwtian
Copy link
Member

mwtian commented May 24, 2022

Btw, do we plan to switch the internal kv calls in dashboard to the async versions?

@jjyao
Copy link
Collaborator

jjyao commented May 24, 2022

usage_lib.py can run in client mode so I guess I have to use internal_kv.py. Although I do want to asyncio feature since the caller is in the asyncio event loop. Currently I have to wrap the sync internval_kv calls in a separate thread to make it asyncio.

@fishbone
Copy link
Contributor Author

Btw, do we plan to switch the internal kv calls in dashboard to the async versions?

Yes, I'll do that in the next PR.

@fishbone
Copy link
Contributor Author

@jjyao let me know if I'm wrong here. I feel usage_lib.py always assume is not a ray client.
https://github.com/ray-project/ray/blob/master/python/ray/_private/usage/usage_lib.py#L187-L188

ray.worker.global_worker.mode is something that only lives on the server side.

For this case, if you need async API, you can create an async client. (check the test code for reference).

What I mean is that you should use gcs_utils.py if you don't need it to run on the client-side in ray client.

@fishbone fishbone merged commit dd3a43b into ray-project:master May 26, 2022
if reply.status.code == GcsCode.OK:
return reply.value
elif reply.status.code == GcsCode.NotFound:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an Awaitable[bytes]. The caller has to check that the return value is not None before calling await on the return object. It is better to return an Awaitable here that indicates the None situation (e.g., empty bytes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once the funciton id async function, it doesn't matter here. It'll be converted to await automatically:

(base) yic@ip-172-31-58-40:~ $ cat x.py
import asyncio

async def f():
    return None

async def main():
    v = await f()
    print("v=", v)

asyncio.run(main())
(base) yic@ip-172-31-58-40:~ $ python x.py
v= None
(base) yic@ip-172-31-58-40:~ $

reply = await self._kv_stub.InternalKVPut(req, timeout=timeout)
if reply.status.code == GcsCode.OK:
return reply.added_num
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the NotFound case?

Copy link
Contributor Author

@fishbone fishbone May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the module lacks good documentation here. Let me put it in another PR.

NotFound won't return here since this is PUT operations and the return is about how many keys were added. Here the return is wired, but this is a legacy reason since the first version of internal kv defines the return in this way and we need to keep backward compatible for now.

If it's returned, it means something is wrong (a bug)

elif reply.status.code == GcsCode.NotFound:
return None
else:
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this idiomatic in Python? To throw exceptions from a function that returns an Awaitable? In folly::future, you have the thenError() method to deal with async exception propagation. The way this method is currently, the caller has be ready for a future in the normal case, None is some cases, or an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is valid. I feel the reason right now folly::future is doing that way is because there is no coroutine support in cpp. This is more like async operations in hacklang.

address=ray.worker.global_worker.gcs_client.address
)

assert await gcs_client.internal_kv_get(b"A", b"NS") is None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sriram-anyscale the None case is also tested here

@pcmoritz
Copy link
Contributor

@iycheng Thanks for explaining! I also had a quick look at the PR, and I think the code is correct but the type signatures are not, can you make a follow up PR to fix them?

Here is what I mean:
The typing of async functions is explained in https://mypy.readthedocs.io/en/stable/more_types.html#typing-async-await. In your case it should be

async def internal_kv_get(
        self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
    ) -> bytes: # And actually, this should be Optional[bytes] since the function can return None in the error case

and not

async def internal_kv_get(
        self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
    ) -> Awaitable[bytes]:

While a call result = client.internal_kv_get(bytes, ...) will have result be of type Awaitable[bytes] , the thing that is returned in the async method body is of type bytes (and then because it is an async, on the caller side of that async method it will be Awaitable[bytes] and needs to be awaited).

I'm pretty sure that is the case, let me know if you don't think so! If it is incorrect please make a follow up PR to fix it since it can be very confusing to other people who read the code in the future :)

) -> Awaitable[bytes]:
logger.debug(f"internal_kv_get {key} {namespace}")
req = gcs_service_pb2.InternalKVGetRequest(namespace=namespace, key=key)
reply = await self._kv_stub.InternalKVGet(req, timeout=timeout)
Copy link
Contributor

@sriram-anyscale sriram-anyscale May 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed this "await" - doesn't this mean this statement blocks until the result is ready?

I think I get it now - it is the return type that is confusing - you are not really returning an Awaitable - you are just allowing the interpreter to do other things while "await"ing. So Philipp's comments are what you need to fix.

@fishbone
Copy link
Contributor Author

fishbone commented Jun 2, 2022

@pcmoritz @sriram-anyscale
Sorry I missed the comments for this one. I got it now. I saw it has already been fixed in this PR. Thanks for the fixing.

@fishbone fishbone deleted the kv-timeout branch June 2, 2022 03:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants